package backtype.storm.testing;

import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.coordination.BatchOutputCollector;
import backtype.storm.topology.base.BaseTransactionalBolt;
import backtype.storm.transactional.ICommitter;
import backtype.storm.transactional.TransactionAttempt;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Tuple;
import backtype.storm.tuple.Values;
import java.util.Map;

public class CountingCommitBolt extends BaseTransactionalBolt implements
		ICommitter {
	BatchOutputCollector _collector;
	TransactionAttempt _id;
	int _count = 0;

	@Override
	public void prepare(Map conf, TopologyContext context,
			BatchOutputCollector collector, TransactionAttempt id) {
		_id = id;
		_collector = collector;
	}

	@Override
	public void execute(Tuple tuple) {
		_count++;
	}

	@Override
	public void finishBatch() {
		_collector.emit(new Values(_id, _count));
	}

	@Override
	public void declareOutputFields(OutputFieldsDeclarer declarer) {
		declarer.declare(new Fields("tx", "count"));
	}

}
